# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import requests
import uuid

from six.moves.urllib import parse as urlparse

from keystoneauth1.identity import v3
from keystoneauth1 import session
from keystoneclient.tests.unit import client_fixtures
from keystoneclient.tests.unit import utils
from keystoneclient.v3 import client


def parameterize(ref):
    """Rewrite attributes to match the kwarg naming convention in client.

    >>> parameterize({'project_id': 0})
    {'project': 0}

    """
    params = ref.copy()
    for key in ref:
        if key[-3:] == '_id':
            params.setdefault(key[:-3], params.pop(key))
    return params


class UnauthenticatedTestCase(utils.TestCase):
    """Class used as base for unauthenticated calls."""

    TEST_ROOT_URL = 'http://127.0.0.1:5000/'
    TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
    TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
    TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')


class TestCase(UnauthenticatedTestCase):

    TEST_ADMIN_IDENTITY_ENDPOINT = "http://127.0.0.1:35357/v3"

    TEST_SERVICE_CATALOG = [{
        "endpoints": [{
            "url": "http://cdn.admin-nets.local:8774/v1.0/",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://127.0.0.1:8774/v1.0",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://cdn.admin-nets.local:8774/v1.0",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "nova_compat"
    }, {
        "endpoints": [{
            "url": "http://nova/novapi/public",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://nova/novapi/internal",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://nova/novapi/admin",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "compute"
    }, {
        "endpoints": [{
            "url": "http://glance/glanceapi/public",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://glance/glanceapi/internal",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://glance/glanceapi/admin",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "image",
        "name": "glance"
    }, {
        "endpoints": [{
            "url": "http://127.0.0.1:5000/v3",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://127.0.0.1:5000/v3",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": TEST_ADMIN_IDENTITY_ENDPOINT,
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "identity"
    }, {
        "endpoints": [{
            "url": "http://swift/swiftapi/public",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://swift/swiftapi/internal",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://swift/swiftapi/admin",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "object-store"
    }]

    def stub_auth(self, subject_token=None, **kwargs):

        if not subject_token:
            subject_token = self.TEST_TOKEN

        try:
            response_list = kwargs['response_list']
        except KeyError:
            headers = kwargs.setdefault('headers', {})
            headers['X-Subject-Token'] = subject_token
        else:
            for resp in response_list:
                headers = resp.setdefault('headers', {})
                headers['X-Subject-Token'] = subject_token

        self.stub_url('POST', ['auth', 'tokens'], **kwargs)


class ClientTestCase(utils.ClientTestCaseMixin, TestCase):

    ORIGINAL_CLIENT_TYPE = 'original'
    KSC_SESSION_CLIENT_TYPE = 'ksc-session'
    KSA_SESSION_CLIENT_TYPE = 'ksa-session'

    scenarios = [
        (
            ORIGINAL_CLIENT_TYPE, {
                'client_fixture_class': client_fixtures.OriginalV3,
                'client_type': ORIGINAL_CLIENT_TYPE
            }
        ),
        (
            KSC_SESSION_CLIENT_TYPE, {
                'client_fixture_class': client_fixtures.KscSessionV3,
                'client_type': KSC_SESSION_CLIENT_TYPE
            }
        ),
        (
            KSA_SESSION_CLIENT_TYPE, {
                'client_fixture_class': client_fixtures.KsaSessionV3,
                'client_type': KSA_SESSION_CLIENT_TYPE
            }
        )

    ]

    @property
    def is_original_client(self):
        return self.client_type == self.ORIGINAL_CLIENT_TYPE

    @property
    def is_session_client(self):
        return self.client_type in (self.KSC_SESSION_CLIENT_TYPE,
                                    self.KSA_SESSION_CLIENT_TYPE)


class CrudTests(object):
    key = None
    collection_key = None
    model = None
    manager = None
    path_prefix = None

    def new_ref(self, **kwargs):
        kwargs.setdefault('id', uuid.uuid4().hex)
        kwargs.setdefault(uuid.uuid4().hex, uuid.uuid4().hex)
        return kwargs

    def encode(self, entity):
        if isinstance(entity, dict):
            return {self.key: entity}
        if isinstance(entity, list):
            return {self.collection_key: entity}
        raise NotImplementedError('Are you sure you want to encode that?')

    def stub_entity(self, method, parts=None, entity=None, id=None, **kwargs):
        if entity:
            entity = self.encode(entity)
            kwargs['json'] = entity

        if not parts:
            parts = [self.collection_key]

            if self.path_prefix:
                parts.insert(0, self.path_prefix)

        if id:
            if not parts:
                parts = []

            parts.append(id)

        self.stub_url(method, parts=parts, **kwargs)

    def assertEntityRequestBodyIs(self, entity):
        self.assertRequestBodyIs(json=self.encode(entity))

    def test_create(self, ref=None, req_ref=None):
        deprecations = self.useFixture(client_fixtures.Deprecations())
        deprecations.expect_deprecations()
        ref = ref or self.new_ref()
        manager_ref = ref.copy()
        manager_ref.pop('id')

        # req_ref argument allows you to specify a different
        # signature for the request when the manager does some
        # conversion before doing the request (e.g. converting
        # from datetime object to timestamp string)
        if req_ref:
            req_ref = req_ref.copy()
        else:
            req_ref = ref.copy()
            req_ref.pop('id')

        self.stub_entity('POST', entity=req_ref, status_code=201)

        returned = self.manager.create(**parameterize(manager_ref))
        self.assertIsInstance(returned, self.model)
        for attr in req_ref:
            self.assertEqual(
                getattr(returned, attr),
                req_ref[attr],
                'Expected different %s' % attr)
        self.assertEntityRequestBodyIs(req_ref)

        # The entity created here may be used in other test cases
        return returned

    def test_get(self, ref=None):
        ref = ref or self.new_ref()

        self.stub_entity('GET', id=ref['id'], entity=ref)

        returned = self.manager.get(ref['id'])
        self.assertIsInstance(returned, self.model)
        for attr in ref:
            self.assertEqual(
                getattr(returned, attr),
                ref[attr],
                'Expected different %s' % attr)

    def _get_expected_path(self, expected_path=None):
        if not expected_path:
            if self.path_prefix:
                expected_path = 'v3/%s/%s' % (self.path_prefix,
                                              self.collection_key)
            else:
                expected_path = 'v3/%s' % self.collection_key

        return expected_path

    def test_list_by_id(self, ref=None, **filter_kwargs):
        """Test ``entities.list(id=x)`` being rewritten as ``GET /v3/entities/x``.

        This tests an edge case of each manager's list() implementation, to
        ensure that it "does the right thing" when users call ``.list()``
        when they should have used ``.get()``.

        """
        if 'id' not in filter_kwargs:
            ref = ref or self.new_ref()
            filter_kwargs['id'] = ref['id']

        self.assertRaises(TypeError, self.manager.list, **filter_kwargs)

    def test_list(self, ref_list=None, expected_path=None,
                  expected_query=None, **filter_kwargs):
        ref_list = ref_list or [self.new_ref(), self.new_ref()]
        expected_path = self._get_expected_path(expected_path)

        self.requests_mock.get(urlparse.urljoin(self.TEST_URL, expected_path),
                               json=self.encode(ref_list))

        returned_list = self.manager.list(**filter_kwargs)
        self.assertEqual(len(ref_list), len(returned_list))
        [self.assertIsInstance(r, self.model) for r in returned_list]

        qs_args = self.requests_mock.last_request.qs
        qs_args_expected = expected_query or filter_kwargs
        for key, value in qs_args_expected.items():
            self.assertIn(key, qs_args)
            # The querystring value is a list. Note we convert the value to a
            # string and lower, as the query string is always a string and the
            # filter_kwargs may contain non-string values, for example a
            # boolean, causing the comaprison to fail.
            self.assertIn(str(value).lower(), qs_args[key])

        # Also check that no query string args exist which are not expected
        for key in qs_args:
            self.assertIn(key, qs_args_expected)

    def test_list_params(self):
        ref_list = [self.new_ref()]
        filter_kwargs = {uuid.uuid4().hex: uuid.uuid4().hex}
        expected_path = self._get_expected_path()

        self.requests_mock.get(urlparse.urljoin(self.TEST_URL, expected_path),
                               json=self.encode(ref_list))

        self.manager.list(**filter_kwargs)
        self.assertQueryStringContains(**filter_kwargs)

    def test_find(self, ref=None):
        ref = ref or self.new_ref()
        ref_list = [ref]

        self.stub_entity('GET', entity=ref_list)

        returned = self.manager.find(name=getattr(ref, 'name', None))
        self.assertIsInstance(returned, self.model)
        for attr in ref:
            self.assertEqual(
                getattr(returned, attr),
                ref[attr],
                'Expected different %s' % attr)

        if hasattr(ref, 'name'):
            self.assertQueryStringIs('name=%s' % ref['name'])
        else:
            self.assertQueryStringIs('')

    def test_update(self, ref=None, req_ref=None):
        deprecations = self.useFixture(client_fixtures.Deprecations())
        deprecations.expect_deprecations()
        ref = ref or self.new_ref()

        self.stub_entity('PATCH', id=ref['id'], entity=ref)

        # req_ref argument allows you to specify a different
        # signature for the request when the manager does some
        # conversion before doing the request (e.g. converting
        # from datetime object to timestamp string)
        if req_ref:
            req_ref = req_ref.copy()
        else:
            req_ref = ref.copy()
            req_ref.pop('id')

        returned = self.manager.update(ref['id'], **parameterize(req_ref))
        self.assertIsInstance(returned, self.model)
        for attr in ref:
            self.assertEqual(
                getattr(returned, attr),
                ref[attr],
                'Expected different %s' % attr)
        self.assertEntityRequestBodyIs(req_ref)

    def test_delete(self, ref=None):
        ref = ref or self.new_ref()

        self.stub_entity('DELETE', id=ref['id'], status_code=204)
        self.manager.delete(ref['id'])


class TestRequestId(TestCase):
    resp = requests.Response()
    TEST_REQUEST_ID = uuid.uuid4().hex
    resp.headers['x-openstack-request-id'] = TEST_REQUEST_ID

    def setUp(self):
        super(TestRequestId, self).setUp()
        auth = v3.Token(auth_url='http://127.0.0.1:5000',
                        token=self.TEST_TOKEN)
        session_ = session.Session(auth=auth)
        self.client = client.Client(session=session_,
                                    include_metadata='True')._adapter
